package com.xyp.demo.hconcurrence.service.impl;

import com.xyp.demo.hconcurrence.mode.StoreOrder;
import com.xyp.demo.hconcurrence.mode.XRequest;
import com.xyp.demo.hconcurrence.remote.RemoteService;
import com.xyp.demo.hconcurrence.service.IOrderService;
import com.xyp.demo.hconcurrence.service.IPerformanceOrderService;
import com.xyp.demo.hconcurrence.utils.JsonMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-05-14 14:34
 */
@Service
public class PerformanceOrderService implements IPerformanceOrderService {
    @Autowired
    private RemoteService remoteService;

    //线程安全的队列
    LinkedBlockingDeque<XRequest> queue=new LinkedBlockingDeque<>();

    @Override
    public StoreOrder getOrderByCode(String code) throws ExecutionException, InterruptedException {
        XRequest request=new XRequest();
        request.setOrderCode(code);

        //JDK 新特性 存储线程完成结果，并可以分发回线程
        CompletableFuture<StoreOrder> future = new CompletableFuture<>();

        request.setFuture(future);

        queue.add(request);

        //阻塞中 等待原厂接口调用完成
        return future.get();
    }

    //如果声明了@Service、@Controller的类，首先会加载 @PostConstruct的类
    //可以理解为随着系统的启动而启动
    @PostConstruct
    public void init(){
        System.out.println("PerformanceOrderService>>>init");
        // Executors.newSingleThreadScheduledExecutor()=Executors.newScheduledThreadPool(1)
        ScheduledExecutorService scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Integer size=queue.size();
                if(size==0){
                    return;
                }
                ArrayList<XRequest> reqList = new ArrayList<>();
                for(int i = 0 ;i < size; i++){
                    XRequest req = queue.poll();
                    reqList.add(req);
                }

                System.out.println("批量处理的数据量为："+size);
                //将list数据，处理成批量处理的参数，然后请求批量处理接口
                List<String> codeList = new ArrayList<>();
                for(XRequest request : reqList){
                    codeList.add(request.getOrderCode());
                }

                //根据批量参数  请求 批量处理方法
                List<StoreOrder> result = remoteService.getOrderListByCodeBatch(codeList);

                //将唯一标识，与相应结果进行对应
                Map<String,StoreOrder> bindData=new HashMap<>();
                for (StoreOrder storeOrder : result){
                    String code =storeOrder.getOrderCode();
                    bindData.put(code,storeOrder);
                }
                //通过XRequest 下发至对应线程
                for(XRequest request : reqList){
                    StoreOrder storeOrder=bindData.get(request.getOrderCode());

                    CompletableFuture<StoreOrder> future = request.getFuture();
                    future.complete(storeOrder);

                }
//              System.out.println(JsonMapper.toJsonString(result));
            }
        },0,5, TimeUnit.MILLISECONDS);
        //0 : 延时时间为0
        //10: 定时时间，每10ms，执行一次
        //TimeUnit.MILLISECONDS : 毫秒
    }
}
